package io.rsocket.resume;

import com.intuit.logging.ILConstants;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.internal.ClientServerInputMultiplexer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: classes4.dex */
public class ClientRSocketSession implements RSocketSession<Mono<DuplexConnection>> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ClientRSocketSession.class);
    private final ByteBufAllocator allocator;
    private volatile Mono<DuplexConnection> newConnection;
    private final ResumableDuplexConnection resumableConnection;
    private volatile ByteBuf resumeToken;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes4.dex */
    public static class RetrySignal implements Retry.RetrySignal {
        private final Throwable ex;

        RetrySignal(Throwable th) {
            this.ex = th;
        }

        @Override // reactor.util.retry.Retry.RetrySignal
        public Throwable failure() {
            return this.ex;
        }

        @Override // reactor.util.retry.Retry.RetrySignal
        public long totalRetries() {
            return 0L;
        }

        @Override // reactor.util.retry.Retry.RetrySignal
        public long totalRetriesInARow() {
            return 0L;
        }
    }

    public ClientRSocketSession(DuplexConnection duplexConnection, final Duration duration, final Retry retry, ResumableFramesStore resumableFramesStore, Duration duration2, boolean z) {
        this.allocator = duplexConnection.alloc();
        this.resumableConnection = new ResumableDuplexConnection("client", duplexConnection, resumableFramesStore, duration2, z);
        onClose().doFinally(new Consumer() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$f-Wgk_DG43jOUL7J0AY_tv8XsXo
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ClientRSocketSession.this.resumeToken.release();
            }
        }).subscribe();
        this.resumableConnection.connectionErrors().flatMap(new Function() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$Gtw5bQ-uRN4vXGTxCbyO-oLDHGw
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return ClientRSocketSession.lambda$new$1(ClientRSocketSession.this, retry, duration, (Throwable) obj);
            }
        }).map(new Function() { // from class: io.rsocket.resume.-$$Lambda$tZXnBfbDW1BoxjLCgDUw9HLCwgs
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                return new ClientServerInputMultiplexer((DuplexConnection) obj);
            }
        }).subscribe(new Consumer() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$FP4VAbP-2WhjPtwLDYxc8fH3rvE
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ClientRSocketSession.lambda$new$2(ClientRSocketSession.this, (ClientServerInputMultiplexer) obj);
            }
        }, new Consumer() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$iUVYWxkY_1ikDfK2d2pOk2iRyWc
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ClientRSocketSession.lambda$new$3(ClientRSocketSession.this, (Throwable) obj);
            }
        });
    }

    private static ConnectionErrorException errorFrameThrowable(long j) {
        return new ConnectionErrorException("resumption_server_pos=[" + j + ILConstants.ARRAY_CLOSE_NEWLINE);
    }

    public static /* synthetic */ Publisher lambda$new$1(ClientRSocketSession clientRSocketSession, Retry retry, Duration duration, Throwable th) {
        logger.debug("Client session connection error. Starting new connection");
        return clientRSocketSession.newConnection.delaySubscription(new AtomicBoolean().compareAndSet(false, true) ? retry.generateCompanion(Flux.just(new RetrySignal(th))) : Mono.empty()).retryWhen(retry).timeout(duration);
    }

    public static /* synthetic */ void lambda$new$2(final ClientRSocketSession clientRSocketSession, ClientServerInputMultiplexer clientServerInputMultiplexer) {
        clientRSocketSession.reconnect(clientServerInputMultiplexer.asClientServerConnection());
        long impliedPosition = clientRSocketSession.resumableConnection.impliedPosition();
        long position = clientRSocketSession.resumableConnection.position();
        logger.debug("Client ResumableConnection reconnected. Sending RESUME frame with state: [impliedPos: {}, pos: {}]", Long.valueOf(impliedPosition), Long.valueOf(position));
        clientRSocketSession.sendFrame(ResumeFrameCodec.encode(clientRSocketSession.allocator, clientRSocketSession.resumeToken.retain(), impliedPosition, position)).then(clientServerInputMultiplexer.asSetupConnection().receive().next()).subscribe((Consumer<? super V>) new Consumer() { // from class: io.rsocket.resume.-$$Lambda$eb0-ORmV73NeKBG1qO7ZT-4Rt9g
            @Override // java.util.function.Consumer
            public final void accept(Object obj) {
                ClientRSocketSession.this.resumeWith((ByteBuf) obj);
            }
        });
    }

    public static /* synthetic */ void lambda$new$3(ClientRSocketSession clientRSocketSession, Throwable th) {
        logger.debug("Client ResumableConnection reconnect timeout");
        clientRSocketSession.resumableConnection.dispose();
    }

    public static /* synthetic */ Mono lambda$null$4(ClientRSocketSession clientRSocketSession, long j, Throwable th) {
        Mono<Void> sendFrame = clientRSocketSession.sendFrame(ErrorFrameCodec.encode(clientRSocketSession.allocator, 0, errorFrameThrowable(j)));
        ResumableDuplexConnection resumableDuplexConnection = clientRSocketSession.resumableConnection;
        resumableDuplexConnection.getClass();
        return sendFrame.then(Mono.fromRunnable(new $$Lambda$QYUWX8kHmzDIjVopy1ASYnxHRC0(resumableDuplexConnection))).then(Mono.never());
    }

    private static long remoteImpliedPos(ByteBuf byteBuf) {
        return ResumeOkFrameCodec.lastReceivedClientPos(byteBuf);
    }

    private static long remotePos(ByteBuf byteBuf) {
        return -1L;
    }

    private Mono<Void> sendFrame(ByteBuf byteBuf) {
        return this.resumableConnection.sendOne(byteBuf).onErrorResume(new Function() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$PlEANlbKSiLJ0ZA9OId71XSboGA
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                Mono empty;
                empty = Mono.empty();
                return empty;
            }
        });
    }

    @Override // io.rsocket.resume.RSocketSession
    public ClientRSocketSession continueWith(Mono<DuplexConnection> mono) {
        this.newConnection = mono;
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public void reconnect(DuplexConnection duplexConnection) {
        this.resumableConnection.reconnect(duplexConnection);
    }

    @Override // io.rsocket.resume.RSocketSession
    public ResumableDuplexConnection resumableConnection() {
        return this.resumableConnection;
    }

    public ClientRSocketSession resumeToken(ByteBuf byteBuf) {
        this.resumeToken = byteBuf.retain();
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public ClientRSocketSession resumeWith(ByteBuf byteBuf) {
        logger.debug("ResumeOK FRAME received");
        long remotePos = remotePos(byteBuf);
        final long remoteImpliedPos = remoteImpliedPos(byteBuf);
        byteBuf.release();
        this.resumableConnection.resume(remotePos, remoteImpliedPos, new Function() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$8mXXjfMg7tIr6Z5yJTCaXDUXRFc
            @Override // java.util.function.Function
            public final Object apply(Object obj) {
                Mono onErrorResume;
                onErrorResume = ((Mono) obj).then().onErrorResume(new Function() { // from class: io.rsocket.resume.-$$Lambda$ClientRSocketSession$f94tstzaN5R4mNOwVFxy1hAMPrw
                    @Override // java.util.function.Function
                    public final Object apply(Object obj2) {
                        return ClientRSocketSession.lambda$null$4(ClientRSocketSession.this, r2, (Throwable) obj2);
                    }
                });
                return onErrorResume;
            }
        });
        return this;
    }

    @Override // io.rsocket.resume.RSocketSession
    public ByteBuf token() {
        return this.resumeToken;
    }
}
